2.Kaggle比赛 Optiver Realized Volatility Prediction 1

1. 比赛数据解释

举办方 Optiver 提供了一个notebook来介绍数据和股票交易数据Introduction to financial concepts and data

这里只介绍给的几个公式:

  1. BidAskSpread, 反映该股票流通性,
  1. Weighted averaged price, 表示股票估值,
  1. log returns, 计算该股票某个时间段的差值,

其中,代表两个时间点,如看作昨天和今天两个点。

  1. Realized volatility,实际波动率,

In this competition, you will be given 10 minutes of book data and we ask you to predict what the volatility will be in the following 10 minutes.

接下来我会按照 alexioslyonlgbm-baseline来解释下baseline。所有代码,按照其调用顺序解释,这会跟源notebook代码顺序有些差别。

另外df的展示基本是部分的,特征非常多。

2. book,trade数据特征工程

1. 读取train/test data
1
2
3
4
5
6
7
8
9
def read_train_test_data():
"""读取数据"""
train = pd.read_csv(data_dir + 'train.csv')
test = pd.read_csv(data_dir + 'test.csv')
#构建新的特征列row_id
train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str)
print(f'Out training set has {train.shape[0]} rows')
return train, test

这里主要就读取train, test,和构建新的row_id.形成如下df.

image-20210902122324707

现在我们可以通过 train_stock_id = train['stock_id'].unique()得到唯一的 stock_id.

image-20210902122505596

2. 预处理核心方法

像如下调用中,使用下面 preprocessor(list_stock_ids, is_train=True)

1
2
3
train_stock_id = train['stock_id'].unique()
train_ = preprocessor(train_stock_id, is_train=True)
train = train.merge(train_, on=['row_id'], how='left')

file_path_book对应路径如下图:

image-20210902120102383

现在我们只读到如下 13行book_preprocessor(file_path_book), trade_preprocessor(file_path_trade)这部分代码。现在要看 book_preprocessor这些函数具体处理细节了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def preprocessor(list_stock_ids, is_train=True):
#并行化处理stock
def for_joblib(stock_id):
if is_train:
#训练数据路径
file_path_book = data_dir + r'book_train.parquet/stock_id=' + str(stock_id)
file_path_trade = data_dir + r'trade_train.parquet/stock_id=' + str(stock_id)
else:
file_path_book = data_dir + r'book_test.parquet/stock_id=' + str(stock_id)
file_path_trade = data_dir + r'trade_test.parquet/stock_id=' + str(stock_id)

#预处理book和trade数据并merge
df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on='row_id', how='left')
return df_tmp

#并行计算for_joblib stock_id看作输入变量
df = Parallel(n_jobs=-1, verbose=1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids)
df = pd.concat(df, ignore_index=True)
return df
3. book_preprocessor逐行分析

book_preprocessor中读取得到df如下。

1
2
df1 = pd.read_parquet('/data/book_train.parquet/stock_id=0')
df1.head()

对应着类似第6行中 file_path_book。我们看看在book_preprocessor未处理的df。

image-20210902122946901

如上图,我们看到 book_train.parquet/stock_id=0得到的df有 'time_id', 'ask_price1' 等等特征列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def book_preprocessor(file_path):
"""book数据预处理:
处理得到各个特征
"""
df = pd.read_parquet(file_path)
df['wap1'] = calc_wap1(df)
df['wap2'] = calc_wap2(df)
df['wap3'] = calc_wap3(df)
df['wap4'] = calc_wap4(df)

#计算log 返回值, 即按照time_id来计算wap1的log return
df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return)
df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return)
df['log_return3'] = df.groupby(['time_id'])['wap3'].apply(log_return)
df['log_return4'] = df.groupby(['time_id'])['wap4'].apply(log_return)

#计算wap balance 资产权重平均价格
df['wap_balance'] = abs(df['wap1'] - df['wap2'])

#price_spread = 2(ask_p1 - bid_p2)/(ask_p2+bid_p2)
df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2)
df['price_spread2'] = (df['ask_price2'] - df['bid_price2']) / ((df['ask_price2'] + df['bid_price2']) / 2)

#bid ask 波动 price1 - price2
df['bid_spread'] = df['bid_price1'] - df['bid_price2']
df['ask_spread'] = df['ask_price1'] - df['ask_price2']
df['bid_ask_spread'] = abs(df['bid_spread'] - df['ask_spread'])

#总交易量 ask/bid size1/2
df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))

#构建特征字典 都以其sum std等作为特征
create_feature_dict = {
'wap1':[np.sum, np.std],
'wap2':[np.sum, np.std],
'wap3':[np.sum, np.std],
'wap4':[np.sum, np.std],
'log_return1': [realized_volatility],
'log_return2': [realized_volatility],
'log_return3': [realized_volatility],
'log_return4': [realized_volatility],
'wap_balance': [np.sum, np.max],
'price_spread': [np.sum, np.max],
'price_spread2': [np.sum, np.max],
'bid_spread': [np.sum, np.max],
'ask_spread': [np.sum, np.max],
'total_volume': [np.sum, np.max],
'volume_imbalance': [np.sum, np.max],
'bid_ask_spread': [np.sum, np.max],
}

#时刻特征字典
create_feature_dict_time ={
'log_return1': [realized_volatility],
'log_return2': [realized_volatility],
'log_return3': [realized_volatility],
'log_return4': [realized_volatility],
}

def get_stats_window(fe_dict, seconds_in_bucket, add_suffix=False):
"""按时间段以及time_id聚合特征,bucket:时间段"""
df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket]\
.groupby(['time_id']).agg(fe_dict).reset_index()

df_feature.columns =['_'.join(col) for col in df_feature.columns]#将所有特征列连起来
if add_suffix:
df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
return df_feature

#获取不同时间窗口的统计信息
df_feature = get_stats_window(create_feature_dict, seconds_in_bucket=0, add_suffix=False)
df_feature_500 = get_stats_window(create_feature_dict, seconds_in_bucket=500, add_suffix=True)
df_feature_400 = get_stats_window(create_feature_dict, seconds_in_bucket=400, add_suffix=True)
df_feature_300 = get_stats_window(create_feature_dict, seconds_in_bucket=300, add_suffix=True)
df_feature_200 = get_stats_window(create_feature_dict, seconds_in_bucket=200, add_suffix=True)
df_feature_100 = get_stats_window(create_feature_dict, seconds_in_bucket=100, add_suffix=True)

#合并不同时间特征
df_feature = df_feature.merge(df_feature_500, how='left', left_on='time_id_', right_on='time_id__500')
df_feature = df_feature.merge(df_feature_400, how='left', left_on='time_id_', right_on='time_id__400')
df_feature = df_feature.merge(df_feature_300, how='left', left_on='time_id_', right_on='time_id__300')
df_feature = df_feature.merge(df_feature_200, how='left', left_on='time_id_', right_on='time_id__200')
df_feature = df_feature.merge(df_feature_100, how='left', left_on='time_id_', right_on='time_id__100')
#丢掉合并中产生的不需要的特征列
df_feature.drop(['time_id__500', 'time_id__400', 'time_id__300', 'time_id__200', 'time_id__100'], axis=1, inplace=True)

stock_id = file_path.split('=')[1]
#'row_id'格式改变为stock_id-[time_id_]值这种形式
df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
#丢掉'time_id_'
df_feature.drop(['time_id_'], axis=1, inplace=True)
return df_feature

我们逐一来看特征构建过程:

  1. df['wap1']这类就是计算wap,都利用的是公式2。对应着如下4个计算函数(这里贴出所有特征处理函数):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def calc_wap1(df):
"""Weighted averaged price权重平均买卖价1"""
wap1 = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) \
/ (df['bid_size1'] + df['ask_size1'])
return wap1

def calc_wap2(df):
"""权重平均买卖价2"""
wap2 = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) \
/ (df['bid_size2'] + df['ask_size2'])
return wap2

def calc_wap3(df):
"""权重平均买卖价2"""
wap3 = (df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1']) \
/ (df['bid_size1'] + df['ask_size1'])
return wap3

def calc_wap4(df):
"""权重平均买卖价2"""
wap4 = (df['bid_price2'] * df['bid_size2'] + df['ask_price2'] * df['ask_size2']) \
/ (df['bid_size2'] + df['ask_size2'])
return wap4

def log_return(series):
"""将输入变为log后对于某一axis寻找差值 log(x/y) = log(x) - log(y)"""
return np.log(series).diff()

def realized_volatility(series):
"""计算实际波动率"""
return np.sqrt(np.sum(series**2))

def count_unique(series):
"""计算series中unique数目"""
return len(np.unique(series))

实际上如上面df1计算 calc_wap1(df1)后得到:

1
2
3
4
5
6
0         1.001434
1 1.001448
2 1.001448
...
917552 0.998517
Length: 917553, dtype: float32

添加四个特征后:

image-20210902162425023

  1. df['log_return1']这类特征也是4个,计算利用的公式3.对上一步得到的特征按 'time_id'分组后应用下面的 log_return计算 'wap1'
1
2
3
def log_return(series):
"""将输入变为log后对于某一axis寻找差值 log(x/y) = log(x) - log(y)"""
return np.log(series).diff()

如df1计算第一个后:

1
2
3
4
5
6
7
8
9
10
df1.groupby(['time_id'])['wap1'].apply(log_return)
====================================================
0 NaN
1 0.000014
2 0.000000
3 -0.000005
...
917551 0.000001
917552 0.000000
Name: wap1, Length: 917553, dtype: float32

这时df1又会得到四个特征log_return

在后面代码中我们还会对这4个特征求[realized_volatility]来构建特征(利用公式4):

1
2
3
def realized_volatility(series):
"""计算实际波动率"""
return np.sqrt(np.sum(series**2))
  1. 剩下的如 'wap_balance''price_spread'等8个特征也如源码计算。这样处理后,df1.head()后面特征如下:

image-20210902163229024

  1. 接下来构建统计信息的特征字典, np.sum, np.std. 注意与后面一个以时间构建实际波动率的字典create_feature_dict_time区分。
  1. 然后是get_stats_window(*fe_dict*, *seconds_in_bucket*, *add_suffix*=False)方法会将4中处理得到的特征字典作为 fe_dict,按照 'seconds_in_bucket'区间大于0, 500…时间段内合计对应 fe_dict信息后 reset_index()
    实际上像 df['seconds_in_bucket'] >= seconds_in_bucket]如果 seconds_in_bucket=0,那么就是选取df1中df['seconds_in_bucket']>0部分来按照 create_feature_dictagg各特征的统计信息如:

image-20210902180440815

将上面列名整理下:

1
2
3
4
5
df1_feature.columns = ['_'.join(col) for col in df1_feature.columns]
df1_feature.columns
====================================================
['wap1_sum', 'wap1_std', 'wap2_sum', 'wap2_std', 'wap3_sum', 'wap3_std',
'wap4_sum', 'wap4_std', 'log_return1_realized_volatility',]

再加上需要后缀就是 get_stats_window作用了。

如果是 df1['seconds_in_bucket']>500 那么使用 create_feature_dict_time构建特征。一样操作,形成:

1
2
df1_feature = df1[df1['seconds_in_bucket']>500].groupby(['time_id']).agg(create_feature_dict_time).head()
df1_feature

image-20210902182051187

最后形成时间段波动率如:

1
2
df1_feature_500 = get_stats_window(create_feature_dict_time, 500, True)
df1_feature_500.head()

image-20210902182406120

  1. 接下来就是简单的合并特征丢掉不需要的特征和构建新 'row_id',最后返回整个特征df。
1
2
df1_feature = df1_feature.merge(df1_feature_500, how='left', left_on='time_id_', right_on='time_id__500')
df1_feature.head()

就以df1_feature有的行拼接 df1_feature_500, 左边以 'time_id_'为index, 右边以 'time_id__500',逐一合并。

image-20210902213721906

特征列完整后接:

image-20210902213904689

接下来就是踢掉不必要的列:

1
2
df1_feature.drop(['time_id__500', 'time_id__400', 'time_id__300', 'time_id__200', 'time_id__100'], axis=1, inplace=True)
df1_feature.head()

至于获取stock_id, 就很简单了,如 stock_id = str("/content/work/data/book_train.parquet/stock_id=0").split('=')[1],这里就是0.

那么,df1_feature['row_id']应该是这个样子的 0-5, 0-11,就是stock_id-time_id_

1
2
df1_feature['row_id'] = df1_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}')
df1_feature.head()

最后返回的特征df尾部就像这样:

image-20210902215049442

4. trade_preprocessor逐行

第一句同样是 df = pd.read_parquet(file_path)test只有stock_id=0我们看看其信息,比较下跟book的区别。

1
2
df2 = pd.read_parquet('/content/work/data/trade_train.parquet/stock_id=0')
df2.head()

image-20210902215322350

下面是整个 trade_preprocessor,后面会逐行解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def trade_preprocessor(file_path):
df = pd.read_parquet(file_path)
df['log_return'] = df.groupby('time_id')['price'].apply(log_return)
df['amount'] = df['price'] * df['size']

#dict for aggregations
create_feature_dict = {
'log_return': [realized_volatility],
'seconds_in_bucket': [count_unique],
'size': [np.sum, np.max, np.min],
'order_count': [np.sum, np.max],
'amount': [np.max, np.max, np.min],
}

create_feature_dict_time = {
'log_return': [realized_volatility],
'seconds_in_bucket': [count_unique],
'size': [np.sum],
'order_count': [np.sum],
}

def get_stats_window(fe_dict, seconds_in_bucket, add_suffix=False):
"""按时间段以及time_id聚合特征,bucket:时间段"""
df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket] \
.groupby(['time_id']).agg(fe_dict).reset_index()

df_feature.columns = ['_'.join(col) for col in df_feature.columns] # 将所有特征列连起来
if add_suffix:
df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket))
return df_feature

# 获取不同时间窗口的统计信息
df_feature = get_stats_window(create_feature_dict, seconds_in_bucket=0, add_suffix=False)
df_feature_500 = get_stats_window(create_feature_dict_time, seconds_in_bucket=500, add_suffix=True)
df_feature_400 = get_stats_window(create_feature_dict_time, seconds_in_bucket=400, add_suffix=True)
df_feature_300 = get_stats_window(create_feature_dict_time, seconds_in_bucket=300, add_suffix=True)
df_feature_200 = get_stats_window(create_feature_dict_time, seconds_in_bucket=200, add_suffix=True)
df_feature_100 = get_stats_window(create_feature_dict_time, seconds_in_bucket=100, add_suffix=True)

def tendency(price, vol):
"""趋势计算"""
df_diff = np.diff(price) #价差
val = (df_diff / price[1:]) * 100
power = np.sum(val * vol[1:])
return (power)

lis = []
for n_time_id in df['time_id'].unique():
#按唯一time_id计算tendency..
df_id = df[df['time_id']==n_time_id]

tendencyV = tendency(df_id['price'].values, df_id['size'].values)
#计算f_max = price > mean
f_max = np.sum(df_id['price'].values > np.mean(df_id['price'].values))
f_min = np.sum(df_id['price'].values < np.mean(df_id['price'].values))

df_max = np.sum(np.diff(df_id['price'].values) > 0)
df_min = np.sum(np.diff(df_id['price'].values) < 0)

#新构建特征 # abs('price'-mean('price'))
abs_diff = np.median(np.abs(df_id['price'].values - np.mean(df_id['price'].values)))
energy = np.mean(df_id['price'].values**2)
#'price'值的75分位 - 25分位
iqr_p = np.percentile(df_id['price'].values, 75) - np.percentile(df_id['price'].values, 25)

#vol var 数量变化特征构建
abs_diff_v = np.median(np.abs(df_id['size'].values - np.mean(df_id['size'].values)))
energy_v = np.sum(df_id['size'].values ** 2)
iqr_p_v = np.percentile(df_id['size'].values, 75) - np.percentile(df_id['size'].values, 25)

#将上面构建的特征形成新的字典添加到lis
lis.append({'time_id': n_time_id, 'tendency': tendencyV, 'f_max': f_max,\
'f_min': f_min, 'df_max': df_max, 'df_min': df_min,\
'abs_diff': abs_diff, 'energy': energy, 'iqr_p': iqr_p,\
'abs_diff_v': abs_diff_v, 'energy_v': energy_v, 'iqr_p_v': iqr_p_v})

#将按时间id构建所有特征列表赋值给df_lr df
df_lr = pd.DataFrame(lis)

#跟df_lr合并
df_feature = df_feature.merge(df_lr, how='left', left_on='time_id_', right_on='time_id')
#合并所有特征
df_feature = df_feature.merge(df_feature_500, how='left', left_on='time_id_', right_on='time_id__500')
df_feature = df_feature.merge(df_feature_400, how='left', left_on='time_id_', right_on='time_id__400')
df_feature = df_feature.merge(df_feature_300, how='left', left_on='time_id_', right_on='time_id__300')
df_feature = df_feature.merge(df_feature_200, how='left', left_on='time_id_', right_on='time_id__200')
df_feature = df_feature.merge(df_feature_100, how='left', left_on='time_id_', right_on='time_id__100')

df_feature.drop(['time_id__500', 'time_id__400', 'time_id__300', 'time_id__200',\
'time_id', 'time_id__100'], axis=1, inplace=True)

#添加前缀
df_feature = df_feature.add_prefix('trade_')
stock_id = file_path.split('=')[1]
# 'row_id'格式改变为stock_id-[trade_time_id_]值这种形式
df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x: f'{stock_id}-{x}')
# 丢掉'trade_time_id_'
df_feature.drop(['trade_time_id_'], axis=1, inplace=True)
return df_feature
  1. 先开始构建两个新特征: 'log_return'time_id计算价格的log_return,'amount'.这时,df如下所示:

image-20210902220526461

  1. book_preprocessor类似地构建两个特征字典:create_feature_dictcreate_feature_dict_time.其中第二个字典中 count_unique计算方法如下:
1
2
3
def count_unique(series):
"""计算series中unique数目"""
return len(np.unique(series))

如:

1
2
df2_feature = df2[df2['seconds_in_bucket'] >= 0].groupby(['time_id']).agg(create_feature_dict).reset_index()
df2_feature.head()

image-20210903153110099
然后就是连接特征列名字和添加_seconds_in_bucket后缀。

1
2
3
df2_feature.columns = ['_'.join(col) for col in df2_feature.columns]
#df2_feature = df2_feature.add_suffix('_' + str(0))
df2_feature.head()

image-20210903154200983

  1. 接下来也跟book_preprocessor中一样构建窗口特征信息。如:
1
2
df2_feature_500 = get_stats_window(create_feature_dict_time, 500, True)
df2_feature_500.head()

image-20210903154459440

  1. 用价格和数量计算 tendency(price, vol)
1
2
3
4
5
def tendency(price, vol):    
df_diff = np.diff(price)
val = (df_diff/price[1:])*100
power = np.sum(val*vol[1:])
return(power)

就是求价格变化百分比乘以数量累加值。

  1. 按时间id计算trade特征,如 tendencyV等,作为字典都添加到lis列表中。然后得到 df_lr = pd.DataFrame(lis)这个新的df。
    具体来说,如 df_id一个特例信息如下:
1
2
3
4
5
6
[i for i in df2['time_id'].unique()][:5]
===========================================
[5, 11, 16, 31, 62]

df2_id = df2[df2['time_id'] == 5]
df2_id.head()

image-20210903161530862

tendencyV = tendency(df_id['price'].values, df_id['size'].values)这个特征构建中,就是取price, size的值,经过tendency方法计算与前一个 seconds_in_bucket差值后得到其差值百分比后乘以对应数量,最后得到变化总和。其它特征也类似构建,最后形成lis列表,转换成新特征 df_lr

你可以将整个这部分转换为一个方法,df作为输入,看看df_lr具体情况:(为了简洁将中间部分省去了。)

1
2
3
4
5
6
7
8
def lis_df_lr(df):
lis = []
for n_time_id in df['time_id'].unique():
...
df_lr = pd.DataFrame(lis)
return df_lr
df2_lr = lis_df_lr(df2)
df2_lr.head()

image-20210903163049610

  1. 先跟df_lr合并再合并所有窗口特征信息 df_feature_500…。
    后面就非常类似book数据处理了,只贴个处理后的部分特征信息:
1
2
df2_feature = df2_feature.merge(df2_lr, how='left', left_on='time_id_', right_on='time_id')
df2_feature.head()

image-20210903163600676

  1. 最后就是特征添加前缀 'trade_', 构建新的 'row_id',最后返回df_feture.
1
2
3
4
df2_feature = df2_feature.add_prefix('trade_')
df2_feature['row_id'] = df2_feature['trade_time_id_'].apply(lambda x:f'{str(0)}-{x}')
df2_feature.drop(['trade_time_id_'], axis = 1, inplace = True)
df2_feature.head()

最后特征df部分信息如下:

image-20210903164231324

5.get_time_stock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def get_time_stock(df):
"""
df中实际波动率realized_volatility相关列分别按
stock_id/time_id分组后聚合求统计指标
然后跟原df合并
"""
vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility',\
'log_return1_realized_volatility_400', 'log_return2_realized_volatility_400',\
'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300',\
'log_return1_realized_volatility_200', 'log_return2_realized_volatility_200',\
'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_400',\
'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_200',
]

#按stock_id分组对vol_cols对应聚合求mean...
df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
# Rename columns joining suffix
df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
df_stock_id = df_stock_id.add_suffix('_' + 'stock')

# 按time_id分组对vol_cols对应聚合求mean...
df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index()
# Rename columns joining suffix
df_time_id.columns = ['_'.join(col) for col in df_time_id.columns]
df_time_id = df_time_id.add_suffix('_' + 'time')

# 跟原df合并
df = df.merge(df_stock_id, how='left', left_on=['stock_id'], right_on=['stock_id__stock'])
df = df.merge(df_time_id, how='left', left_on=['time_id'], right_on=['time_id__time'])
df.drop(['stock_id__stock', 'time_id__time'], axis=1, inplace=True)
return df

实际调用过程中是这部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
train, test = read_train_test_data()
#get unique stock ids
train_stock_id = train['stock_id'].unique()
train_ = preprocessor(train_stock_id, is_train=True)
train = train.merge(train_, on=['row_id'], how='left')

#get unique stock ids
test_stock_id = test['stock_id'].unique()
test_ = preprocessor(train_stock_id, is_train=True)
test = test.merge(test_, on=['row_id'], how='left')

train = get_time_stock(train)
test = get_time_stock(test)

这里是得到train, test csv数据后,按 'stock_id'处理得到 train_等进一步merge得到一个我们在第3、4小节构建的整个新特征df。

接下来我们按照vol_cols列表中的特征列(这些特征都是df中表示 realized_volatility的,我也不知道为什么会扔掉部分,如return2这种,可能是已经怕特征冗余,或者太多特征处理时间过长)

1
2
df_stock_id = train.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min']).reset_index()
df_stock_id.head()

这时候df_stock_id变成:

image-20210903185915539

将index连起来添加个后缀:

1
2
3
df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns]
df_stock_id = df_stock_id.add_suffix('_' + 'stock')
df_stock_id.head()

image-20210903190402234

同样按time_id处理得到 df_time_id:

image-20210903190549023

接下来就是合并得到整个df了,如train.head():

image-20210903201615083

test.head()如下:

image-20210903201741354

接下来是利用 'trade_seconds_in_bucket_count_unique'继续构建新的特征:

1
2
3
4
5
6
7
8
9
10
11
12
## 计算train test size_tau
train['size_tau'] = np.sqrt(1/train['trade_seconds_in_bucket_count_unique'])
test['size_tau'] = np.sqrt(1/test['trade_seconds_in_bucket_count_unique'])

train['size_tau_400'] = np.sqrt(1/train['trade_seconds_in_bucket_count_unique_400'])
test['size_tau_400'] = np.sqrt(1/test['trade_seconds_in_bucket_count_unique_400'])

train['size_tau_300'] = np.sqrt(1/train['trade_seconds_in_bucket_count_unique_300'])
test['size_tau_300'] = np.sqrt(1/test['trade_seconds_in_bucket_count_unique_300'])

train['size_tau_200'] = np.sqrt(1/train['trade_seconds_in_bucket_count_unique_200'])
test['size_tau_200'] = np.sqrt(1/test['trade_seconds_in_bucket_count_unique_200'])

image-20210903203324850

同样利用 'trade_order_count_sum'构建新特征:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
## size_tau2
train['size_tau2'] = np.sqrt(1/train['trade_order_count_sum'])
test['size_tau2'] = np.sqrt(1/test['trade_order_count_sum'])

train['size_tau2_400'] = np.sqrt(0.33/train['trade_order_count_sum'])
test['size_tau2_400'] = np.sqrt(0.33/test['trade_order_count_sum'])

train['size_tau2_300'] = np.sqrt(0.5/train['trade_order_count_sum'])
test['size_tau2_300'] = np.sqrt(0.5/test['trade_order_count_sum'])

train['size_tau2_200'] = np.sqrt(0.66/train['trade_order_count_sum'])
test['size_tau2_200'] = np.sqrt(0.66/test['trade_order_count_sum'])

#delta tau
train['size_tau2_d'] = train['size_tau2_400'] - train['size_tau2']
test['size_tau2_d'] = test['size_tau2_400'] - test['size_tau2']

image-20210903203711575

现在特征列:

1
2
3
colNames = [col for col in list(train.columns)
if col not in {"stock_id", "time_id", "target", "row_id"}]
len(colNames)#194

3.Kmeans聚类构建特征

在第一篇中,我们得到了特征工程后的train, test df。现在采用kmeans聚类的方式,按相似股票聚合,并计算以上特征的平均值作为新的特征。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from sklearn.cluster import KMeans

train_p = pd.read_csv(data_dir + 'train.csv')
#重构train_p 以'time_id'为index
train_p = train_p.pivot(index='time_id', columns='stock_id', values='target')
corr = train_p.corr() #pearson 相关系数

ids = corr.index

kmeans = KMeans(n_clusters=7, random_state=SEED).fit(corr.values)
print(kmeans.labels_)
==========================================================================
[0 5 3 2 0 0 2 3 1 2 0 5 3 3 0 0 0 2 3 3 3 5 0 0 6 0 0 3 6 3 0 3 3 0 6 6 3
6 3 0 3 0 3 3 0 5 3 3 3 5 5 6 6 6 2 5 2 3 0 3 3 0 3 0 5 6 2 5 6 5 1 4 6 5
5 0 2 5 6 6 5 3 0 0 5 2 6 6 0 5 0 3 3 3 3 0 0 6 0 5 0 3 0 5 0 3 0 5 3 5 3
5]

l = [] #各类别对应索引列表 l[1]:[8, 80]
for n in range(7):
l.append([(x-1) for x in ((ids+1) * (kmeans.labels_==n)) if x > 0])

这里依次用train的透视表重构特征后的相关系数值进行聚类,然后用((ids+1) * (kmeans.labels_==n))得到各类别对应索引的列表l。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#拿相关系数索引进一步构建新的特征df列表分别为mat, matTest
mat = []
matTest = []
n = 0
for ind in l:
print(ind)
#选取stock_id是ind的df
newDf = train.loc[train['stock_id'].isin(ind)]
newDf = newDf.groupby(['time_id']).agg(np.nanmean)
newDf.loc[:, 'stock_id'] = str(n) + 'c1'#将stock_id这列赋值为str(n)+'c1'如0c1
mat.append(newDf)

newDf = test.loc[test['stock_id'].isin(ind)]
newDf = newDf.groupby(['time_id']).agg(np.nanmean)
newDf.loc[:, 'stock_id'] = str(n) + 'c1'
matTest.append(newDf)
n += 1 #总共7类
=======================================================================================
[0, 4, 5, 10, 15, 16, 17, 23, 26, 28, 29, 33, 36, 42, 44, 48, 66, 69, 72, 85, 94, 95, 100, 102, 108, 109, 111, 113, 115, 118, 120]
[8, 80]
[3, 6, 9, 18, 61, 63, 75, 86, 97]
[2, 7, 13, 14, 19, 20, 21, 30, 32, 34, 35, 39, 41, 43, 46, 47, 51, 52, 53, 64, 67, 68, 70, 93, 103, 104, 105, 107, 114, 119, 123, 125]
[81]
[1, 11, 22, 50, 55, 56, 62, 73, 76, 78, 83, 84, 87, 90, 96, 101, 112, 116, 122, 124, 126]
[27, 31, 37, 38, 40, 58, 59, 60, 74, 77, 82, 88, 89, 98, 99, 110]

接下来是对生成的matrix进行转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#转为pd
mat1 = pd.concat(mat).reset_index()
mat1.drop(columns=['target'], inplace=True)

mat2 = pd.concat(matTest).reset_index()
#不太清楚这个test生成df为什么拼接mat1中time_id==5的数据
mat2 = pd.concat([mat2, mat1.loc[mat1.time_id==5]])

mat1 = mat1.pivot(index='time_id', columns='stock_id')
mat1.columns = ['_'.join(x) for x in mat1.columns.ravel()]#将列名连起来
mat1.reset_index(inplace=True)

mat2 = mat2.pivot(index='time_id', columns='stock_id')
mat2.columns = ['_'.join(x) for x in mat2.columns.ravel()]
mat2.reset_index(inplace=True)

上面代码如mat1 = mat1.pivot(index='time_id', columns='stock_id')mat1如下:

image-20210904151553980

拼接下列名 mat1.columns = ['_'.join(x) for x in mat1.columns.ravel()]

image-20210904151803379

然后就是选取一些特征继续合并:

1
2
train = pd.merge(train, mat1[nnn], how='left', on='time_id')#nnn就是选取的特征列
test = pd.merge(test, mat2[nnn], how='left', on='time_id')

这就是所有特征工程的内容了,真的服了alexioslyon大神的能力。接下来可以使用LGBM和NN、CNN等模型来预测target。你可以把test,traintrain.to_pickle(‘path’)保存下来,方便下次使用。

Inference

[1] optiver-realized-volatility-prediction

[2] 股市交易的撮合机制究竟是怎么运行的?

[3] Optiver Realized Volatility Prediction